package com.tsq.weixin.provider.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.tsq.web.base.service.impl.BaseServiceImpl;
import com.tsq.weixin.api.config.TaskExcutor;
import com.tsq.weixin.api.model.WxUser;
import com.tsq.weixin.provider.mapper.WxUserMapper;
import com.tsq.weixin.provider.service.WxUserService;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.api.WxMpUserService;
import me.chanjar.weixin.mp.bean.result.WxMpUser;
import me.chanjar.weixin.mp.bean.result.WxMpUserList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Service
public class WxUserServiceImpl extends BaseServiceImpl<WxUserMapper, WxUser> implements WxUserService {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private WxUserMapper userMapper;
    @Autowired
    private WxMpService wxMpService;
    private volatile static boolean syncWxUserTaskRunning = false;

    @Override
    public IPage<WxUser> queryPage(WxUser wxUser) {

        String openid = wxUser.getOpenid();
        String nickname = wxUser.getNickname();
        String appid = wxUser.getAppid();
        String city = wxUser.getCity();
        String  qrSceneStr = wxUser.getQrSceneStr();
        long currentPage =wxUser.getCurrent();
        long pageSize =wxUser.getSize();

        return this.baseMapper.selectPage(new Page<>(currentPage,pageSize),new QueryWrapper<WxUser>()
                                            .eq(!StringUtils.isEmpty(appid), "appid", appid)
                                            .eq(!StringUtils.isEmpty(openid), "openid", openid)
                                            .like(!StringUtils.isEmpty(nickname), "nickname", nickname)
                                            .eq(!StringUtils.isEmpty(city), "city", city)
                                            .eq(!StringUtils.isEmpty(qrSceneStr), "qrSceneStr", qrSceneStr));

    }

    @Override
    public WxUser refreshUserInfo(String openid, String appid) {

        try {
            // 获取微信用户基本信息
            logger.info("更新用户信息，openid={}",openid);
            wxMpService.switchover(appid);
            WxMpUser userWxInfo = wxMpService.getUserService().userInfo(openid, null);
            if (userWxInfo == null) {
                logger.error("获取不到用户信息，无法更新,openid:{}",openid);
                return null;
            }
            WxUser user = new WxUser(userWxInfo,appid);
            this.updateOrInsert(user);
            return user;
        } catch (Exception e) {
            logger.error("更新用户信息失败,openid:{}",openid+",异常信息："+e.getMessage());
        }
        return null;
    }

    @Override
    public void refreshUserInfoAsync(String appid , String[] openidList) {

        logger.info("批量更新用户信息：任务开始");
        for(String openid:openidList){
            wxMpService.switchover(appid);
            TaskExcutor.submit(()->this.refreshUserInfo(openid,appid));
        }
        logger.info("批量更新用户信息：任务全部添加到线程池");
    }

    @Override
    public void updateOrInsert(WxUser user) {

        int updateCount = this.baseMapper.updateById(user);
        if (updateCount < 1) {
            this.baseMapper.insert(user);
        }
    }

    @Override
    public void unsubscribe(String openid) {
        userMapper.unsubscribe(openid);
    }

    /**
     * 同步用户列表,公众号一次拉取调用最多拉取10000个关注者的OpenID，可以通过传入nextOpenid参数多次拉取
     */
    @Override
    @Async
    public void syncWxUsers(String appid) {
        if(syncWxUserTaskRunning) {
            return;//同步较慢，防止个多线程重复执行同步任务
        }
        syncWxUserTaskRunning=true;
        logger.info("同步公众号粉丝列表：任务开始");
        wxMpService.switchover(appid);
        boolean hasMore=true;
        String nextOpenid=null;
        WxMpUserService wxMpUserService = wxMpService.getUserService();
        try {
            int page=1;
            while (hasMore){
                WxMpUserList wxMpUserList = wxMpUserService.userList(nextOpenid);//拉取openid列表，每次最多1万个
                logger.info("拉取openid列表：第{}页，数量：{}",page++,wxMpUserList.getCount());
                List<String> openids = wxMpUserList.getOpenids();
                this.syncWxUsers(appid,openids);
                nextOpenid=wxMpUserList.getNextOpenid();
                hasMore=!StringUtils.isEmpty(nextOpenid) && wxMpUserList.getCount()>=10000;
            }
        } catch (WxErrorException e) {
            logger.error("同步公众号粉丝出错:",e);
        }
        logger.info("同步公众号粉丝列表：完成");
        syncWxUserTaskRunning=false;
    }

    @Override
    public void syncWxUsers(String appid, List<String> openids){
        if(openids.size()<1) {
            return;
        }
        final String batch=openids.get(0).substring(20);//截取首个openid的一部分做批次号（打印日志时使用，无实际意义）
        WxMpUserService wxMpUserService = wxMpService.getUserService();
        int start=0,batchSize=openids.size(),end=Math.min(100,batchSize);
        logger.info("开始处理批次：{}，批次数量：{}",batch,batchSize);
        while (start<end && end<=batchSize){//分批处理,每次最多拉取100个用户信息
            final int finalStart = start,finalEnd = end;
            final List<String> subOpenids=openids.subList(finalStart,finalEnd);
            TaskExcutor.submit(()->{//使用线程池同步数据，否则大量粉丝数据需同步时会很慢
                logger.info("同步批次:【{}--{}-{}】，数量：{}",batch, finalStart, finalEnd,subOpenids.size());
                wxMpService.switchover(appid);
                List<WxMpUser> wxMpUsers = null;//批量获取用户信息，每次最多100个
                try {
                    wxMpUsers = wxMpUserService.userInfoList(subOpenids);
                } catch (WxErrorException e) {
                    logger.error("同步出错，批次：【{}--{}-{}】，错误信息：{}",batch, finalStart, finalEnd,e);
                }
                if(wxMpUsers!=null && !wxMpUsers.isEmpty()){
                    List<WxUser> wxUsers=wxMpUsers.parallelStream().map(item->new WxUser(item,appid)).collect(Collectors.toList());
                    this.saveOrUpdateBatch(wxUsers);
                }
            });
            start=end;
            end=Math.min(end+100,openids.size());
        }
        logger.info("批次：{}处理完成",batch);
    }

    @Override
    public WxUser getById(String openid) {
        return super.getOne(new QueryWrapper<WxUser>().eq("openid",openid));
    }

}
